{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Verifying the MLOps environment on GCP\n",
    "\n",
    "This notebook verifies the MLOps environment provisioned on GCP\n",
    "1. Test using the local MLflow server in AI Notebooks instance in log entries to the Cloud SQL\n",
    "2. Test deploying and running an Airflow workflow on Composer that uses MLflow server on GKE to log entries to the Cloud SQL"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Running a local MLflow experiment\n",
    "We implement a simple Scikit-learn model training routine, and examine the logged entries in Cloud SQL and produced articats in Cloud Storage through MLflow tracking."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import re\n",
    "import mlflow\n",
    "import mlflow.sklearn\n",
    "import numpy as np\n",
    "from sklearn.linear_model import LogisticRegression\n",
    "import pymysql\n",
    "from IPython.core.display import display, HTML"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mlflow_tracking_uri = mlflow.get_tracking_uri()\n",
    "MLFLOW_EXPERIMENTS_URI = os.environ['MLFLOW_EXPERIMENTS_URI']\n",
    "\n",
    "print(\"MLflow tracking server URI: {}\".format(mlflow_tracking_uri))\n",
    "print(\"MLflow artifacts store root: {}\".format(MLFLOW_EXPERIMENTS_URI))\n",
    "print(\"MLflow SQL connction name: {}\".format(os.environ['MLFLOW_SQL_CONNECTION_NAME']))\n",
    "print(\"MLflow SQL connction string: {}\".format(os.environ['MLFLOW_SQL_CONNECTION_STR']))\n",
    "print(\"Cloud Composer name: {}\".format(os.environ['MLOPS_COMPOSER_NAME']))\n",
    "print(\"Cloud Composer instance region: {}\".format(os.environ['MLOPS_REGION']))\n",
    "\n",
    "display(HTML('<hr>You can check results of this test in MLflow and GCS folder:'))\n",
    "display(HTML('<h4><a href=\"{}\" rel=\"noopener noreferrer\" target=\"_blank\">Click to open MLflow UI</a></h4>'.format(os.environ['MLFLOW_TRACKING_EXTERNAL_URI'])))\n",
    "display(HTML('<h4><a href=\"https://console.cloud.google.com/storage/browser/{}\" rel=\"noopener noreferrer\" target=\"_blank\">Click to open GCS folder</a></h4>'.format(MLFLOW_EXPERIMENTS_URI.replace('gs://',''))))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.1. Training a simple Scikit-learn model from Notebook environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "experiment_name = \"notebooks-test\"\n",
    "mlflow.set_experiment(experiment_name)\n",
    "\n",
    "with mlflow.start_run(nested=True):\n",
    "    X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)\n",
    "    y = np.array([0, 0, 1, 1, 1, 0])\n",
    "    lr = LogisticRegression()\n",
    "    lr.fit(X, y)\n",
    "    score = lr.score(X, y)\n",
    "    print(\"Score: %s\" % score)\n",
    "    mlflow.log_metric(\"score\", score)\n",
    "    mlflow.sklearn.log_model(lr, \"model\")\n",
    "    print(\"Model saved in run %s\" % mlflow.active_run().info.run_uuid)\n",
    "    current_model=mlflow.get_artifact_uri('model')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.2. Query the Mlfow entries from Cloud SQL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sqlauth=re.search('mysql\\\\+pymysql://(?P<user>.*):(?P<psw>.*)@127.0.0.1:3306/mlflow', os.environ['MLFLOW_SQL_CONNECTION_STR'],re.DOTALL)\n",
    "connection = pymysql.connect(\n",
    "    host='127.0.0.1',\n",
    "    port=3306,\n",
    "    database='mlflow',\n",
    "    user=sqlauth.group('user'),\n",
    "    passwd=sqlauth.group('psw')\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### List tables\n",
    "You should see a list of table names like 'experiments','metrics','model_versions','runs'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor = connection.cursor()   \n",
    "cursor.execute(\"SHOW TABLES\")\n",
    "for entry in cursor:\n",
    "    print(entry[0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Retrieve experiment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(\"SELECT * FROM experiments where name='{}' ORDER BY experiment_id desc LIMIT 1\".format(experiment_name))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"Experiment not found\")\n",
    "else:\n",
    "    experiment_id = list(cursor)[0][0]\n",
    "    print(\"'{}' experiment ID: {}\".format(experiment_name, experiment_id))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Query runs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(\"SELECT * FROM runs where experiment_id={} ORDER BY start_time desc LIMIT 1\".format(experiment_id))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"No runs found\")\n",
    "else:\n",
    "    entity=list(cursor)[0]\n",
    "    run_uuid = entity[0]\n",
    "    print(\"Last run id of '{}' experiment is: {}\\n\".format(experiment_name, run_uuid))\n",
    "    print(entity)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Query metrics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(\"SELECT * FROM metrics where run_uuid = '{}'\".format(run_uuid))\n",
    "for entry in cursor:\n",
    "    print(entry)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.3. List the artifacts in Cloud Storage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil ls {current_model}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Submitting a workflow to Composer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We implement a one-step Airflow workflow that trains a Scikit-learn model, and examine the logged entries in Cloud SQL and produced articats in Cloud Storage through MLflow tracking."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "COMPOSER_NAME=os.environ['MLOPS_COMPOSER_NAME']\n",
    "REGION=os.environ['MLOPS_REGION']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.1. Writing the Airflow workflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile test-sklearn-mlflow.py\n",
    "\n",
    "import airflow\n",
    "import mlflow\n",
    "import mlflow.sklearn\n",
    "import numpy as np\n",
    "from datetime import timedelta\n",
    "from sklearn.linear_model import LogisticRegression\n",
    "from airflow.operators import PythonOperator\n",
    "\n",
    "def train_model(**kwargs):\n",
    "\n",
    "    print(\"Train lr model step started...\")\n",
    "    print(\"MLflow tracking uri: {}\".format(mlflow.get_tracking_uri()))\n",
    "    mlflow.set_experiment(\"airflow-test\")\n",
    "    with mlflow.start_run(nested=True):\n",
    "        X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)\n",
    "        y = np.array([0, 0, 1, 1, 1, 0])\n",
    "        lr = LogisticRegression()\n",
    "        lr.fit(X, y)\n",
    "        score = lr.score(X, y)\n",
    "        print(\"Score: %s\" % score)\n",
    "        mlflow.log_metric(\"score\", score)\n",
    "        mlflow.sklearn.log_model(lr, \"model\")\n",
    "        print(\"Model saved in run %s\" % mlflow.active_run().info.run_uuid)\n",
    "    print(\"Train lr model step finished.\")\n",
    "    \n",
    "default_args = {\n",
    "    'retries': 1,\n",
    "    'start_date': airflow.utils.dates.days_ago(0)\n",
    "}\n",
    "\n",
    "with airflow.DAG(\n",
    "    'test_sklearn_mlflow',\n",
    "    default_args=default_args,\n",
    "    schedule_interval=None,\n",
    "    dagrun_timeout=timedelta(minutes=20)) as dag:\n",
    "    \n",
    "    train_model_op = PythonOperator(\n",
    "        task_id='train_sklearn_model',\n",
    "        provide_context=True,\n",
    "        python_callable=train_model\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.2. Uploading the Airflow workflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments storage dags import \\\n",
    "  --environment {COMPOSER_NAME}  --location {REGION} \\\n",
    "  --source test-sklearn-mlflow.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments storage dags list \\\n",
    "  --environment {COMPOSER_NAME}  --location {REGION}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.3. Triggering the workflow"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Please wait for 30-60 seconds before triggering the workflow at the first Airflow Dag import"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments run {COMPOSER_NAME} \\\n",
    "    --location {REGION} unpause -- test_sklearn_mlflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments run {COMPOSER_NAME} \\\n",
    "    --location {REGION} trigger_dag -- test_sklearn_mlflow"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.4. Query the MLfow entries from Cloud SQL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor = connection.cursor()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Retrieve experiment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "experiment_name = \"airflow-test\"\n",
    "cursor.execute(\"SELECT * FROM experiments where name='{}' ORDER BY experiment_id desc LIMIT 1\".format(experiment_name))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"Experiment not found\")\n",
    "else:\n",
    "    experiment_id = list(cursor)[0][0]\n",
    "    print(\"'{}' experiment ID: {}\".format(experiment_name, experiment_id))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Query runs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(\"SELECT * FROM runs where experiment_id={} ORDER BY start_time desc LIMIT 1\".format(experiment_id))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"No runs found\")\n",
    "else:\n",
    "    entity=list(cursor)[0]\n",
    "    run_uuid = entity[0]\n",
    "    print(\"Last run id of '{}' experiment is: {}\\n\".format(experiment_name, run_uuid))\n",
    "    print(entity)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Query metrics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(\"SELECT * FROM metrics where run_uuid = '{}'\".format(run_uuid))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"No metrics found\")\n",
    "else:\n",
    "    for entry in cursor:\n",
    "        print(entry)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.5. List the artifacts in Cloud Storage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil ls {MLFLOW_EXPERIMENTS_URI}/{experiment_id}/{run_uuid}/artifacts/model"
   ]
  }
 ],
 "metadata": {
  "environment": {
   "name": "r-cpu.3-6.m49",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/r-cpu.3-6:m49"
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}